In [1]:
import pyspark
from pyspark.sql import SQLContext

# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

In [2]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())

In [3]:
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("data/nlpdata/raw_classified.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
#Create DataFrame
data_df = sqlContext.createDataFrame(typed_rdd, ["text", "id", "label"])
#data_df.show()
data_df.printSchema()


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)


In [4]:
data_df.show(4)


+--------------------+------------------+-----+
|                text|                id|label|
+--------------------+------------------+-----+
|Fresh install of ...|        1018769417|  1.0|
|Well. Now I know ...|       10284216536|  1.0|
|"Literally six we...|       10298589026|  1.0|
|Mitsubishi i MiEV...|109017669432377344|  1.0|
+--------------------+------------------+-----+
only showing top 4 rows


In [5]:
# predict language and filter out those with less than 90% chance of being English
lang_df = data_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")

In [6]:
en_df.printSchema()


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- lang: string (nullable = true)


In [7]:
en_df.show(4)


+--------------------+------------------+-----+----+
|                text|                id|label|lang|
+--------------------+------------------+-----+----+
|RT @goeentertain:...|665305154954989568|  1.0|  en|
|Teforia Uses Mach...|660668007975268352|  1.0|  en|
|   Apple TV or Roku?|       25842461136|  1.0|  en|
|Finished http://t...|        9412369614|  1.0|  en|
+--------------------+------------------+-----+----+
only showing top 4 rows


In [8]:
# remove stop words to reduce dimensionality
rm_stops_df = en_df.withColumn("stop_text", remove_stops_udf(en_df["text"]))

In [9]:
rm_stops_df.printSchema()


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_text: string (nullable = true)


In [10]:
rm_stops_df.show(4)


+--------------------+------------------+-----+----+--------------------+
|                text|                id|label|lang|           stop_text|
+--------------------+------------------+-----+----+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0|  en|RT @goeentertain:...|
|Teforia Uses Mach...|660668007975268352|  1.0|  en|Teforia Uses Mach...|
|   Apple TV or Roku?|       25842461136|  1.0|  en|      Apple TV Roku?|
|Finished http://t...|        9412369614|  1.0|  en|Finished http://t...|
+--------------------+------------------+-----+----+--------------------+
only showing top 4 rows


In [11]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df = rm_stops_df.withColumn("feat_text", \
                                        remove_features_udf(rm_stops_df["stop_text"]))

In [12]:
rm_features_df.printSchema()


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_text: string (nullable = true)
 |-- feat_text: string (nullable = true)


In [13]:
rm_features_df.show(4)


+--------------------+------------------+-----+----+--------------------+--------------------+
|                text|                id|label|lang|           stop_text|           feat_text|
+--------------------+------------------+-----+----+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0|  en|RT @goeentertain:...|  future blase   ...|
|Teforia Uses Mach...|660668007975268352|  1.0|  en|Teforia Uses Mach...|teforia uses mach...|
|   Apple TV or Roku?|       25842461136|  1.0|  en|      Apple TV Roku?|         apple  roku|
|Finished http://t...|        9412369614|  1.0|  en|Finished http://t...|            finished|
+--------------------+------------------+-----+----+--------------------+--------------------+
only showing top 4 rows


In [14]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df = rm_features_df.withColumn("tagged_text", \
                                      tag_and_remove_udf(rm_features_df.feat_text))

In [15]:
tagged_df.printSchema()


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_text: string (nullable = true)
 |-- feat_text: string (nullable = true)
 |-- tagged_text: string (nullable = true)


In [16]:
tagged_df.show(4)


+--------------------+------------------+-----+----+--------------------+--------------------+--------------------+
|                text|                id|label|lang|           stop_text|           feat_text|         tagged_text|
+--------------------+------------------+-----+----+--------------------+--------------------+--------------------+
|RT @goeentertain:...|665305154954989568|  1.0|  en|RT @goeentertain:...|  future blase   ...| future blase vic...|
|Teforia Uses Mach...|660668007975268352|  1.0|  en|Teforia Uses Mach...|teforia uses mach...| teforia uses mac...|
|   Apple TV or Roku?|       25842461136|  1.0|  en|      Apple TV Roku?|         apple  roku|         apple roku |
|Finished http://t...|        9412369614|  1.0|  en|Finished http://t...|            finished|           finished |
+--------------------+------------------+-----+----+--------------------+--------------------+--------------------+
only showing top 4 rows


In [17]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))

In [18]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))

In [19]:
# remove all rows containing only blank spaces
check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")
no_blanks_df.printSchema()


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_text: string (nullable = true)
 |-- feat_text: string (nullable = true)
 |-- tagged_text: string (nullable = true)
 |-- lemm_text: string (nullable = true)
 |-- is_blank: string (nullable = true)


In [20]:
# rename columns
no_blanks_df = no_blanks_df.withColumn("text",no_blanks_df.lemm_text)

In [21]:
# dedupe important since alot of the tweets only differed by url's and RT mentions
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])

In [22]:
# select only the columns we care about
data_set = dedup_df.select('id', 'text','label')

In [23]:
data_set.show(4)


+------------------+--------------------+-----+
|                id|                text|label|
+------------------+--------------------+-----+
|        1546813742|              dragon|  1.0|
|        1558492525|           hurt much|  1.0|
|383221484023709697|seth blog word se...|  1.0|
|660668007975268352|teforia use machi...|  1.0|
+------------------+--------------------+-----+
only showing top 4 rows


In [24]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data_set.randomSplit([0.6, 0.4])

In [25]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

In [26]:
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(minDocFreq=3, inputCol="features", outputCol="idf")

In [27]:
# 
nb = NaiveBayes()

In [28]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])

In [29]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [30]:
# Make predictions.
predictions = model.transform(testData)

In [31]:
# Select example rows to display.
predictions.select("text", "label", "prediction").show(5)


+--------------------+-----+----------+
|                text|label|prediction|
+--------------------+-----+----------+
|           hurt much|  1.0|       1.0|
|teforia use machi...|  1.0|       1.0|
|              finish|  1.0|       1.0|
|future blase vice...|  1.0|       1.0|
|              divine|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 5 rows


In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)


Out[32]:
0.912655971479501

Cross Validation


In [33]:
#paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()
# paramGrid = ParamGridBuilder().addGrid(rf.maxDepth,[4,8,10]).\
#                     addGrid(rf.impurity, ['entropy','gini']).build()


# cv = CrossValidator(estimator=pipeline, 
#                     estimatorParamMaps=paramGrid, 
#                     evaluator=MulticlassClassificationEvaluator(), 
#                     numFolds=4)
                    

# #training_df.show(5)  
# cvModel = cv.fit(training_df)

In [34]:
#prediction = cvModel.transform(test_df)